package com.example.demo.stream.join;

import com.example.demo.entity.UserBean;
import org.apache.commons.collections.MapUtils;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * flink流连接功能
 */
public class StreamJoinMain {

    public static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    public static DataStream<UserBean> textStream1 = env.fromElements(
            new UserBean("userID1", 1293914003, "browse1", "productID1", 8),
            new UserBean("userID2", 1292984002, "click1", "productID1", 10),
            new UserBean("userID3", 1293384003, "bob1", "productID1", 10)
    );
    public static DataStream<UserBean> textStream2 = env.fromElements(
            new UserBean("userID1", 1293984003, "browse2", "productID2", 8),
            new UserBean("userID2", 1293924002, "click2", "productID2", 10),
            new UserBean("userID3", 1293914103, "bob2", "productID2", 10)
    );
    public static DataStream<UserBean> textStream3 = env.fromElements(
            new UserBean("userID1", 1293984003, "browse3", "productID3", 8),
            new UserBean("userID2", 1293924002, "click3", "productID3", 10),
            new UserBean("userID3", 1293914103, "bob3", "productID3", 10),
            new UserBean("userID4", 1293914103, "bob3", "productID3", 10)
    );
    public static DataStream<Tuple1<String>> dataStream1 = env.fromElements(
            Tuple1.of("flink"),
            Tuple1.of("spark"),
            Tuple1.of("hadoop"),
            Tuple1.of("userID1")
    );
    public static DataStream<Tuple2<String, String>> dataStream2 = env.fromElements(
            Tuple2.of("userID1", "flink"),
            Tuple2.of("userID2", "spark"),
            Tuple2.of("userID3", "hadoop"),
            Tuple2.of("userID4", "userID1")
    );

    public static void main(String[] args) throws Exception {
        // type 1 union
        // type 2 join
        // type 3 connect
        // type 4 coGroup
        String arg = "--type 4";
        String type = arg.split(" ")[1];
        env.setParallelism(1);
        switch (type) {
            case "1":
                union();
                break;
            case "2":
                join();
                break;
            case "3":
                connect();
                break;
            case "4":
                coGroup();
                break;
            default:
                throw new RuntimeException("连接类型错误！");
        }
        env.execute("join job demo");
    }


    /**
     * union: 将多个流合并到一个流中 (需要每个流的数据类型一致才能使用union) 无需条件
     * UserBean(userID=userID1, eventTime=1293914003, eventType=browse1, productID=productID1, productPrice=8)
     * UserBean(userID=userID1, eventTime=1292984002, eventType=click1, productID=productID1, productPrice=10)
     * UserBean(userID=userID3, eventTime=1293384003, eventType=click1, productID=productID1, productPrice=10)
     * UserBean(userID=userID1, eventTime=1293984003, eventType=browse2, productID=productID2, productPrice=8)
     * UserBean(userID=userID1, eventTime=1293924002, eventType=click2, productID=productID2, productPrice=10)
     * UserBean(userID=userID3, eventTime=1293914103, eventType=click2, productID=productID2, productPrice=10)
     * UserBean(userID=userID1, eventTime=1293984003, eventType=browse3, productID=productID3, productPrice=8)
     * UserBean(userID=userID1, eventTime=1293924002, eventType=click3, productID=productID3, productPrice=10)
     * UserBean(userID=userID3, eventTime=1293914103, eventType=click3, productID=productID3, productPrice=10)
     */
    private static void union() {
        textStream1.union(textStream2, textStream3).print("union输出流==> ");
//        textStream1.union(textStream2, textStream3, dataStream1).print();
    }


    /**
     * join: 根据指定条件进行流关联(相当于数据库的内连接 只有能匹配的流才会留下 可以对源类型不同的流进行连接)
     * join输出流==> > [UserBean(userID=userID2, eventTime=1292984002, eventType=click1, productID=productID1, productPrice=10), UserBean(userID=userID2, eventTime=1293924002, eventType=click2, productID=productID2, productPrice=10), UserBean(userID=userID2, eventTime=1293924002, eventType=click3, productID=productID3, productPrice=10)]
     * join输出流==> > [UserBean(userID=userID3, eventTime=1293384003, eventType=bob1, productID=productID1, productPrice=10), UserBean(userID=userID3, eventTime=1293914103, eventType=bob2, productID=productID2, productPrice=10), UserBean(userID=userID3, eventTime=1293914103, eventType=bob3, productID=productID3, productPrice=10)]
     * join输出流==> > [UserBean(userID=userID1, eventTime=1293914003, eventType=browse1, productID=productID1, productPrice=8), UserBean(userID=userID1, eventTime=1293984003, eventType=browse2, productID=productID2, productPrice=8), UserBean(userID=userID1, eventTime=1293984003, eventType=browse3, productID=productID3, productPrice=8)]
     */
    private static void join() {
        textStream1.join(textStream2)
                // 关联条件
                .where(UserBean::getUserID)
                // 等于
                .equalTo(UserBean::getUserID)
                // 开启事件时间窗口
                .window(EventTimeSessionWindows.withGap(Time.seconds(1)))
                // 执行Join函数
                .apply(new JoinFunction<UserBean, UserBean, List<UserBean>>() {
                    @Override
                    public List<UserBean> join(UserBean first, UserBean second) throws Exception {
                        List<UserBean> userBeans = new ArrayList<>();
                        userBeans.add(first);
                        userBeans.add(second);
                        return userBeans;
                    }
                })
                .join(textStream3)
                .where(item -> item.get(0).getUserID())
                .equalTo(UserBean::getUserID)
                .window(EventTimeSessionWindows.withGap(Time.seconds(1)))
                .apply(new FlatJoinFunction<List<UserBean>, UserBean, List<UserBean>>() {
                    @Override
                    public void join(List<UserBean> first, UserBean second, Collector<List<UserBean>> out) throws Exception {
                        first.add(second);
                        out.collect(first);
                    }
                })
                .print("join输出流==> ");
    }


    /**
     * connect: 两个流连接转换为同一数据类型的流(可以对数据类型不通的流进行连接)
     */
    private static void connect() {
        textStream1.connect(dataStream1)
                .map(new CoMapFunction<UserBean, Tuple1<String>, Tuple2<String, String>>() {

                    @Override
                    public Tuple2<String, String> map1(UserBean value) throws Exception {
                        return Tuple2.of(value.getEventType(), value.toString());
                    }

                    @Override
                    public Tuple2<String, String> map2(Tuple1<String> value) throws Exception {
                        return Tuple2.of(value.f0, value.f0);
                    }
                }).print("connect输出流==>");

    }

    /**
     * 类似与外连接 key相同的同时出现在两个迭代器里 不同的会单独输出(可以对数据类型不同的流进行连接)
     */
    private static void coGroup() {
        textStream1.coGroup(dataStream2)
                .where(UserBean::getUserID)
                .equalTo(item -> item.f0)
                .window(EventTimeSessionWindows.withGap(Time.seconds(3)))
                .trigger(CountTrigger.of(1))
                .apply(new CoGroupFunction<UserBean, Tuple2<String, String>, Map<String, String>>() {
                    @Override
                    public void coGroup(Iterable<UserBean> first, Iterable<Tuple2<String, String>> second, Collector<Map<String, String>> out) throws Exception {
                        Map<String, String> map = new HashMap<>();
                        Map<String, String> map1 = new HashMap<>();
                        String key = null;
                        int i = 1;
                        for (UserBean userBean : first) {
                            key = userBean.getUserID();
                            map.put(userBean.getUserID(), userBean.toString());
                        }
                        for (Tuple2<String, String> stringTuple1 : second) {
                            i++;
                            if (stringTuple1.f0.equals(key)) {
                                map1.put(key, map.get(key) + " " + stringTuple1.toString());
                                continue;
                            }
                            map1.put(stringTuple1.f0, stringTuple1.toString() + i);
                        }
                        if (MapUtils.isNotEmpty(map))
                            out.collect(map);
                        if (MapUtils.isNotEmpty(map1))
                            out.collect(map1);


                    }
                }).print("coGroup输出流==> ");
    }


}

